/*
 * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
 * Copyright (c) 2009-2012, Pieter Noordhuis <pcnoordhuis at gmail dot com>
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met:
 *
 *   * Redistributions of source code must retain the above copyright notice,
 *     this list of conditions and the following disclaimer.
 *   * Redistributions in binary form must reproduce the above copyright
 *     notice, this list of conditions and the following disclaimer in the
 *     documentation and/or other materials provided with the distribution.
 *   * Neither the name of Redis nor the names of its contributors may be used
 *     to endorse or promote products derived from this software without
 *     specific prior written permission.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 */

/*-----------------------------------------------------------------------------
 * Sorted set API
 *----------------------------------------------------------------------------*/

/* ZSETs are ordered sets using two data structures to hold the same elements
 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
 * data structure.
 *
 * The elements are added to a hash table mapping Redis objects to scores.
 * At the same time the elements are added to a skip list mapping scores
 * to Redis objects (so objects are sorted by scores in this "view"). */

/* This skiplist implementation is almost a C translation of the original
 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
 * Alternative to Balanced Trees", modified in three ways:
 * a) this implementation allows for repeated scores.
 * b) the comparison is not just by key (our 'score') but by satellite data.
 * c) there is a back pointer, so it's a doubly linked list with the back
 * pointers being only at "level 1". This allows to traverse the list
 * from tail to head, useful for ZREVRANGE. */

#include "../lib/server.h"
#include <math.h>

static int zslLexValueGteMin ( robj *value, zlexrangespec *spec );
static int zslLexValueLteMax ( robj *value, zlexrangespec *spec );

zskiplistNode *zslCreateNode ( int level, double score, robj *obj )
{
    zskiplistNode *zn = zmalloc (sizeof (*zn ) + level * sizeof (struct zskiplistLevel ));
    zn->score = score;
    zn->obj = obj;
    return zn;
}

zskiplist *zslCreate ( void )
{
    int j;
    zskiplist *zsl;

    zsl = zmalloc (sizeof (*zsl ));
    zsl->level = 1;
    zsl->length = 0;
    zsl->header = zslCreateNode (ZSKIPLIST_MAXLEVEL, 0, NULL);
    for ( j = 0; j < ZSKIPLIST_MAXLEVEL; j ++ )
    {
        zsl->header->level[j].forward = NULL;
        zsl->header->level[j].span = 0;
    }
    zsl->header->backward = NULL;
    zsl->tail = NULL;
    return zsl;
}

void zslFreeNode ( zskiplistNode *node )
{
    decrRefCount (node->obj);
    zfree (node);
}

void zslFree ( zskiplist *zsl )
{
    zskiplistNode *node = zsl->header->level[0].forward, *next;

    zfree (zsl->header);
    while ( node )
    {
        next = node->level[0].forward;
        zslFreeNode (node);
        node = next;
    }
    zfree (zsl);
}

/* Returns a random level for the new skiplist node we are going to create.
 * The return value of this function is between 1 and ZSKIPLIST_MAXLEVEL
 * (both inclusive), with a powerlaw-alike distribution where higher
 * levels are less likely to be returned. */
int zslRandomLevel ( void )
{
    int level = 1;
    while ( ( random ()&0xFFFF ) < ( ZSKIPLIST_P * 0xFFFF ) )
        level += 1;
    return (level < ZSKIPLIST_MAXLEVEL ) ? level : ZSKIPLIST_MAXLEVEL;
}

zskiplistNode *zslInsert ( zskiplist *zsl, double score, robj *obj )
{
    zskiplistNode * update[ZSKIPLIST_MAXLEVEL], *x;
    unsigned int rank[ZSKIPLIST_MAXLEVEL];
    int i, level;

    x = zsl->header;
    for ( i = zsl->level - 1; i >= 0; i -- )
    {
        /* store rank that is crossed to reach the insert position */
        rank[i] = i == ( zsl->level - 1 ) ? 0 : rank[i + 1];
        while ( x->level[i].forward &&
                ( x->level[i].forward->score < score ||
                  ( x->level[i].forward->score == score &&
                    compareStringObjects (x->level[i].forward->obj, obj) < 0 ) ) )
        {
            rank[i] += x->level[i].span;
            x = x->level[i].forward;
        }
        update[i] = x;
    }
    /* we assume the key is not already inside, since we allow duplicated
     * scores, and the re-insertion of score and redis object should never
     * happen since the caller of zslInsert() should test in the hash table
     * if the element is already inside or not. */
    level = zslRandomLevel ();
    if ( level > zsl->level )
    {
        for ( i = zsl->level; i < level; i ++ )
        {
            rank[i] = 0;
            update[i] = zsl->header;
            update[i]->level[i].span = zsl->length;
        }
        zsl->level = level;
    }
    x = zslCreateNode (level, score, obj);
    for ( i = 0; i < level; i ++ )
    {
        x->level[i].forward = update[i]->level[i].forward;
        update[i]->level[i].forward = x;

        /* update span covered by update[i] as x is inserted here */
        x->level[i].span = update[i]->level[i].span - ( rank[0] - rank[i] );
        update[i]->level[i].span = ( rank[0] - rank[i] ) + 1;
    }

    /* increment span for untouched levels */
    for ( i = level; i < zsl->level; i ++ )
    {
        update[i]->level[i].span ++;
    }

    x->backward = ( update[0] == zsl->header ) ? NULL : update[0];
    if ( x->level[0].forward )
        x->level[0].forward->backward = x;
    else
        zsl->tail = x;
    zsl->length ++;
    return x;
}

/* Internal function used by zslDelete, zslDeleteByScore and zslDeleteByRank */
void zslDeleteNode ( zskiplist *zsl, zskiplistNode *x, zskiplistNode **update )
{
    int i;
    for ( i = 0; i < zsl->level; i ++ )
    {
        if ( update[i]->level[i].forward == x )
        {
            update[i]->level[i].span += x->level[i].span - 1;
            update[i]->level[i].forward = x->level[i].forward;
        }
        else
        {
            update[i]->level[i].span -= 1;
        }
    }
    if ( x->level[0].forward )
    {
        x->level[0].forward->backward = x->backward;
    }
    else
    {
        zsl->tail = x->backward;
    }
    while ( zsl->level > 1 && zsl->header->level[zsl->level - 1].forward == NULL )
        zsl->level --;
    zsl->length --;
}

/* Delete an element with matching score/object from the skiplist. */
int zslDelete ( zskiplist *zsl, double score, robj *obj )
{
    zskiplistNode * update[ZSKIPLIST_MAXLEVEL], *x;
    int i;

    x = zsl->header;
    for ( i = zsl->level - 1; i >= 0; i -- )
    {
        while ( x->level[i].forward &&
                ( x->level[i].forward->score < score ||
                  ( x->level[i].forward->score == score &&
                    compareStringObjects (x->level[i].forward->obj, obj) < 0 ) ) )
            x = x->level[i].forward;
        update[i] = x;
    }
    /* We may have multiple elements with the same score, what we need
     * is to find the element with both the right score and object. */
    x = x->level[0].forward;
    if ( x && score == x->score && equalStringObjects (x->obj, obj) )
    {
        zslDeleteNode (zsl, x, update);
        zslFreeNode (x);
        return 1;
    }
    return 0; /* not found */
}

static int zslValueGteMin ( double value, zrangespec *spec )
{
    return spec->minex ? ( value > spec->min ) : ( value >= spec->min );
}

int zslValueLteMax ( double value, zrangespec *spec )
{
    return spec->maxex ? ( value < spec->max ) : ( value <= spec->max );
}

/* Returns if there is a part of the zset is in range. */
int zslIsInRange ( zskiplist *zsl, zrangespec *range )
{
    zskiplistNode *x;

    /* Test for ranges that will always be empty. */
    if ( range->min > range->max ||
         ( range->min == range->max && ( range->minex || range->maxex ) ) )
        return 0;
    x = zsl->tail;
    if ( x == NULL || ! zslValueGteMin (x->score, range) )
        return 0;
    x = zsl->header->level[0].forward;
    if ( x == NULL || ! zslValueLteMax (x->score, range) )
        return 0;
    return 1;
}

/* Find the first node that is contained in the specified range.
 * Returns NULL when no element is contained in the range. */
zskiplistNode *zslFirstInRange ( zskiplist *zsl, zrangespec *range )
{
    zskiplistNode *x;
    int i;

    /* If everything is out of range, return early. */
    if ( ! zslIsInRange (zsl, range) ) return NULL;

    x = zsl->header;
    for ( i = zsl->level - 1; i >= 0; i -- )
    {
        /* Go forward while *OUT* of range. */
        while ( x->level[i].forward &&
                ! zslValueGteMin (x->level[i].forward->score, range) )
            x = x->level[i].forward;
    }

    /* This is an inner range, so the next node cannot be NULL. */
    x = x->level[0].forward;

    /* Check if score <= max. */
    if ( ! zslValueLteMax (x->score, range) ) return NULL;
    return x;
}

/* Find the last node that is contained in the specified range.
 * Returns NULL when no element is contained in the range. */
zskiplistNode *zslLastInRange ( zskiplist *zsl, zrangespec *range )
{
    zskiplistNode *x;
    int i;

    /* If everything is out of range, return early. */
    if ( ! zslIsInRange (zsl, range) ) return NULL;

    x = zsl->header;
    for ( i = zsl->level - 1; i >= 0; i -- )
    {
        /* Go forward while *IN* range. */
        while ( x->level[i].forward &&
                zslValueLteMax (x->level[i].forward->score, range) )
            x = x->level[i].forward;
    }


    /* Check if score >= min. */
    if ( ! zslValueGteMin (x->score, range) ) return NULL;
    return x;
}

/* Delete all the elements with score between min and max from the skiplist.
 * Min and max are inclusive, so a score >= min || score <= max is deleted.
 * Note that this function takes the reference to the hash table view of the
 * sorted set, in order to remove the elements from the hash table too. */
unsigned long zslDeleteRangeByScore ( zskiplist *zsl, zrangespec *range, dict *dict )
{
    zskiplistNode * update[ZSKIPLIST_MAXLEVEL], *x;
    unsigned long removed = 0;
    int i;

    x = zsl->header;
    for ( i = zsl->level - 1; i >= 0; i -- )
    {
        while ( x->level[i].forward && ( range->minex ?
                                         x->level[i].forward->score <= range->min :
                                         x->level[i].forward->score < range->min ) )
            x = x->level[i].forward;
        update[i] = x;
    }

    /* Current node is the last with score < or <= min. */
    x = x->level[0].forward;

    /* Delete nodes while in range. */
    while ( x &&
            ( range->maxex ? x->score < range->max : x->score <= range->max ) )
    {
        zskiplistNode *next = x->level[0].forward;
        zslDeleteNode (zsl, x, update);
        dictDelete (dict, x->obj);
        zslFreeNode (x);
        removed ++;
        x = next;
    }
    return removed;
}

unsigned long zslDeleteRangeByLex ( zskiplist *zsl, zlexrangespec *range, dict *dict )
{
    zskiplistNode * update[ZSKIPLIST_MAXLEVEL], *x;
    unsigned long removed = 0;
    int i;


    x = zsl->header;
    for ( i = zsl->level - 1; i >= 0; i -- )
    {
        while ( x->level[i].forward &&
                ! zslLexValueGteMin (x->level[i].forward->obj, range) )
            x = x->level[i].forward;
        update[i] = x;
    }

    /* Current node is the last with score < or <= min. */
    x = x->level[0].forward;

    /* Delete nodes while in range. */
    while ( x && zslLexValueLteMax (x->obj, range) )
    {
        zskiplistNode *next = x->level[0].forward;
        zslDeleteNode (zsl, x, update);
        dictDelete (dict, x->obj);
        zslFreeNode (x);
        removed ++;
        x = next;
    }
    return removed;
}

/* Delete all the elements with rank between start and end from the skiplist.
 * Start and end are inclusive. Note that start and end need to be 1-based */
unsigned long zslDeleteRangeByRank ( zskiplist *zsl, unsigned int start, unsigned int end, dict *dict )
{
    zskiplistNode * update[ZSKIPLIST_MAXLEVEL], *x;
    unsigned long traversed = 0, removed = 0;
    int i;

    x = zsl->header;
    for ( i = zsl->level - 1; i >= 0; i -- )
    {
        while ( x->level[i].forward && ( traversed + x->level[i].span ) < start )
        {
            traversed += x->level[i].span;
            x = x->level[i].forward;
        }
        update[i] = x;
    }

    traversed ++;
    x = x->level[0].forward;
    while ( x && traversed <= end )
    {
        zskiplistNode *next = x->level[0].forward;
        zslDeleteNode (zsl, x, update);
        dictDelete (dict, x->obj);
        zslFreeNode (x);
        removed ++;
        traversed ++;
        x = next;
    }
    return removed;
}

/* Find the rank for an element by both score and key.
 * Returns 0 when the element cannot be found, rank otherwise.
 * Note that the rank is 1-based due to the span of zsl->header to the
 * first element. */
unsigned long zslGetRank ( zskiplist *zsl, double score, robj *o )
{
    zskiplistNode *x;
    unsigned long rank = 0;
    int i;

    x = zsl->header;
    for ( i = zsl->level - 1; i >= 0; i -- )
    {
        while ( x->level[i].forward &&
                ( x->level[i].forward->score < score ||
                  ( x->level[i].forward->score == score &&
                    compareStringObjects (x->level[i].forward->obj, o) <= 0 ) ) )
        {
            rank += x->level[i].span;
            x = x->level[i].forward;
        }

        /* x might be equal to zsl->header, so test if obj is non-NULL */
        if ( x->obj && equalStringObjects (x->obj, o) )
        {
            return rank;
        }
    }
    return 0;
}

/* Finds an element by its rank. The rank argument needs to be 1-based. */
zskiplistNode* zslGetElementByRank ( zskiplist *zsl, unsigned long rank )
{
    zskiplistNode *x;
    unsigned long traversed = 0;
    int i;

    x = zsl->header;
    for ( i = zsl->level - 1; i >= 0; i -- )
    {
        while ( x->level[i].forward && ( traversed + x->level[i].span ) <= rank )
        {
            traversed += x->level[i].span;
            x = x->level[i].forward;
        }
        if ( traversed == rank )
        {
            return x;
        }
    }
    return NULL;
}

/* Populate the rangespec according to the objects min and max. */
static int zslParseRange ( robj *min, robj *max, zrangespec *spec )
{
    char *eptr;
    spec->minex = spec->maxex = 0;

    /* Parse the min-max interval. If one of the values is prefixed
     * by the "(" character, it's considered "open". For instance
     * ZRANGEBYSCORE zset (1.5 (2.5 will match min < x < max
     * ZRANGEBYSCORE zset 1.5 2.5 will instead match min <= x <= max */
    if ( min->encoding == OBJ_ENCODING_INT )
    {
        spec->min = ( long ) min->ptr;
    }
    else
    {
        if ( ( ( char* ) min->ptr )[0] == '(' )
        {
            spec->min = strtod (( char* ) min->ptr + 1, &eptr);
            if ( eptr[0] != '\0' || isnan (spec->min) ) return C_ERR;
            spec->minex = 1;
        }
        else
        {
            spec->min = strtod (( char* ) min->ptr, &eptr);
            if ( eptr[0] != '\0' || isnan (spec->min) ) return C_ERR;
        }
    }
    if ( max->encoding == OBJ_ENCODING_INT )
    {
        spec->max = ( long ) max->ptr;
    }
    else
    {
        if ( ( ( char* ) max->ptr )[0] == '(' )
        {
            spec->max = strtod (( char* ) max->ptr + 1, &eptr);
            if ( eptr[0] != '\0' || isnan (spec->max) ) return C_ERR;
            spec->maxex = 1;
        }
        else
        {
            spec->max = strtod (( char* ) max->ptr, &eptr);
            if ( eptr[0] != '\0' || isnan (spec->max) ) return C_ERR;
        }
    }

    return C_OK;
}

/* ------------------------ Lexicographic ranges ---------------------------- */

/* Parse max or min argument of ZRANGEBYLEX.
 * (foo means foo (open interval)
 * [foo means foo (closed interval)
 * - means the min string possible
 * + means the max string possible
 *
 * If the string is valid the *dest pointer is set to the redis object
 * that will be used for the comparision, and ex will be set to 0 or 1
 * respectively if the item is exclusive or inclusive. C_OK will be
 * returned.
 *
 * If the string is not a valid range C_ERR is returned, and the value
 * of *dest and *ex is undefined. */
int zslParseLexRangeItem ( robj *item, robj **dest, int *ex )
{
    char *c = item->ptr;

    switch ( c[0] )
    {
        case '+':
            if ( c[1] != '\0' ) return C_ERR;
            *ex = 0;
            *dest = shared.maxstring;
            incrRefCount (shared.maxstring);
            return C_OK;
        case '-':
            if ( c[1] != '\0' ) return C_ERR;
            *ex = 0;
            *dest = shared.minstring;
            incrRefCount (shared.minstring);
            return C_OK;
        case '(':
            *ex = 1;
            *dest = createStringObject (c + 1, sdslen (c) - 1);
            return C_OK;
        case '[':
            *ex = 0;
            *dest = createStringObject (c + 1, sdslen (c) - 1);
            return C_OK;
        default:
            return C_ERR;
    }
}

/* Populate the rangespec according to the objects min and max.
 *
 * Return C_OK on success. On error C_ERR is returned.
 * When OK is returned the structure must be freed with zslFreeLexRange(),
 * otherwise no release is needed. */
static int zslParseLexRange ( robj *min, robj *max, zlexrangespec *spec )
{
    /* The range can't be valid if objects are integer encoded.
     * Every item must start with ( or [. */
    if ( min->encoding == OBJ_ENCODING_INT ||
         max->encoding == OBJ_ENCODING_INT ) return C_ERR;

    spec->min = spec->max = NULL;
    if ( zslParseLexRangeItem (min, &spec->min, &spec->minex) == C_ERR ||
         zslParseLexRangeItem (max, &spec->max, &spec->maxex) == C_ERR )
    {
        if ( spec->min ) decrRefCount (spec->min);
        if ( spec->max ) decrRefCount (spec->max);
        return C_ERR;
    }
    else
    {
        return C_OK;
    }
}

/* Free a lex range structure, must be called only after zelParseLexRange()
 * populated the structure with success (C_OK returned). */
void zslFreeLexRange ( zlexrangespec *spec )
{
    decrRefCount (spec->min);
    decrRefCount (spec->max);
}

/* This is just a wrapper to compareStringObjects() that is able to
 * handle shared.minstring and shared.maxstring as the equivalent of
 * -inf and +inf for strings */
int compareStringObjectsForLexRange ( robj *a, robj *b )
{
    if ( a == b ) return 0; /* This makes sure that we handle inf,inf and
                             -inf,-inf ASAP. One special case less. */
    if ( a == shared.minstring || b == shared.maxstring ) return - 1;
    if ( a == shared.maxstring || b == shared.minstring ) return 1;
    return compareStringObjects (a, b);
}

static int zslLexValueGteMin ( robj *value, zlexrangespec *spec )
{
    return spec->minex ?
        ( compareStringObjectsForLexRange (value, spec->min) > 0 ) :
        ( compareStringObjectsForLexRange (value, spec->min) >= 0 );
}

static int zslLexValueLteMax ( robj *value, zlexrangespec *spec )
{
    return spec->maxex ?
        ( compareStringObjectsForLexRange (value, spec->max) < 0 ) :
        ( compareStringObjectsForLexRange (value, spec->max) <= 0 );
}

/* Returns if there is a part of the zset is in the lex range. */
int zslIsInLexRange ( zskiplist *zsl, zlexrangespec *range )
{
    zskiplistNode *x;

    /* Test for ranges that will always be empty. */
    if ( compareStringObjectsForLexRange (range->min, range->max) > 1 ||
         ( compareStringObjects (range->min, range->max) == 0 &&
           ( range->minex || range->maxex ) ) )
        return 0;
    x = zsl->tail;
    if ( x == NULL || ! zslLexValueGteMin (x->obj, range) )
        return 0;
    x = zsl->header->level[0].forward;
    if ( x == NULL || ! zslLexValueLteMax (x->obj, range) )
        return 0;
    return 1;
}

/* Find the first node that is contained in the specified lex range.
 * Returns NULL when no element is contained in the range. */
zskiplistNode *zslFirstInLexRange ( zskiplist *zsl, zlexrangespec *range )
{
    zskiplistNode *x;
    int i;

    /* If everything is out of range, return early. */
    if ( ! zslIsInLexRange (zsl, range) ) return NULL;

    x = zsl->header;
    for ( i = zsl->level - 1; i >= 0; i -- )
    {
        /* Go forward while *OUT* of range. */
        while ( x->level[i].forward &&
                ! zslLexValueGteMin (x->level[i].forward->obj, range) )
            x = x->level[i].forward;
    }

    /* This is an inner range, so the next node cannot be NULL. */
    x = x->level[0].forward;

    /* Check if score <= max. */
    if ( ! zslLexValueLteMax (x->obj, range) ) return NULL;
    return x;
}

/* Find the last node that is contained in the specified range.
 * Returns NULL when no element is contained in the range. */
zskiplistNode *zslLastInLexRange ( zskiplist *zsl, zlexrangespec *range )
{
    zskiplistNode *x;
    int i;

    /* If everything is out of range, return early. */
    if ( ! zslIsInLexRange (zsl, range) ) return NULL;

    x = zsl->header;
    for ( i = zsl->level - 1; i >= 0; i -- )
    {
        /* Go forward while *IN* range. */
        while ( x->level[i].forward &&
                zslLexValueLteMax (x->level[i].forward->obj, range) )
            x = x->level[i].forward;
    }


    /* Check if score >= min. */
    if ( ! zslLexValueGteMin (x->obj, range) ) return NULL;
    return x;
}

/*-----------------------------------------------------------------------------
 * Ziplist-backed sorted set API
 *----------------------------------------------------------------------------*/

double zzlGetScore ( unsigned char *sptr )
{
    unsigned char *vstr;
    unsigned int vlen;
    long long vlong;
    char buf[128];
    double score;

    ziplistGet (sptr, &vstr, &vlen, &vlong);

    if ( vstr )
    {
        memcpy (buf, vstr, vlen);
        buf[vlen] = '\0';
        score = strtod (buf, NULL);
    }
    else
    {
        score = vlong;
    }

    return score;
}

/* Return a ziplist element as a Redis string object.
 * This simple abstraction can be used to simplifies some code at the
 * cost of some performance. */
robj *ziplistGetObject ( unsigned char *sptr )
{
    unsigned char *vstr;
    unsigned int vlen;
    long long vlong;

    ziplistGet (sptr, &vstr, &vlen, &vlong);

    if ( vstr )
    {
        return createStringObject (( char* ) vstr, vlen);
    }
    else
    {
        return createStringObjectFromLongLong (vlong);
    }
}

/* Compare element in sorted set with given element. */
int zzlCompareElements ( unsigned char *eptr, unsigned char *cstr, unsigned int clen )
{
    unsigned char *vstr;
    unsigned int vlen;
    long long vlong;
    unsigned char vbuf[32];
    int minlen, cmp;

    ziplistGet (eptr, &vstr, &vlen, &vlong);
    if ( vstr == NULL )
    {
        /* Store string representation of long long in buf. */
        vlen = ll2string (( char* ) vbuf, sizeof (vbuf ), vlong);
        vstr = vbuf;
    }

    minlen = ( vlen < clen ) ? vlen : clen;
    cmp = memcmp (vstr, cstr, minlen);
    if ( cmp == 0 ) return vlen - clen;
    return cmp;
}

unsigned int zzlLength ( unsigned char *zl )
{
    return ziplistLen (zl) / 2;
}

/* Move to next entry based on the values in eptr and sptr. Both are set to
 * NULL when there is no next entry. */
void zzlNext ( unsigned char *zl, unsigned char **eptr, unsigned char **sptr )
{
    unsigned char *_eptr, *_sptr;

    _eptr = ziplistNext (zl, *sptr);
    if ( _eptr != NULL )
    {
        _sptr = ziplistNext (zl, _eptr);
    }
    else
    {
        /* No next entry. */
        _sptr = NULL;
    }

    *eptr = _eptr;
    *sptr = _sptr;
}

/* Move to the previous entry based on the values in eptr and sptr. Both are
 * set to NULL when there is no next entry. */
void zzlPrev ( unsigned char *zl, unsigned char **eptr, unsigned char **sptr )
{
    unsigned char *_eptr, *_sptr;

    _sptr = ziplistPrev (zl, *eptr);
    if ( _sptr != NULL )
    {
        _eptr = ziplistPrev (zl, _sptr);
    }
    else
    {
        /* No previous entry. */
        _eptr = NULL;
    }

    *eptr = _eptr;
    *sptr = _sptr;
}

/* Returns if there is a part of the zset is in range. Should only be used
 * internally by zzlFirstInRange and zzlLastInRange. */
int zzlIsInRange ( unsigned char *zl, zrangespec *range )
{
    unsigned char *p;
    double score;

    /* Test for ranges that will always be empty. */
    if ( range->min > range->max ||
         ( range->min == range->max && ( range->minex || range->maxex ) ) )
        return 0;

    p = ziplistIndex (zl, - 1); /* Last score. */
    if ( p == NULL ) return 0; /* Empty sorted set */
    score = zzlGetScore (p);
    if ( ! zslValueGteMin (score, range) )
        return 0;

    p = ziplistIndex (zl, 1); /* First score. */
    score = zzlGetScore (p);
    if ( ! zslValueLteMax (score, range) )
        return 0;

    return 1;
}

/* Find pointer to the first element contained in the specified range.
 * Returns NULL when no element is contained in the range. */
unsigned char *zzlFirstInRange ( unsigned char *zl, zrangespec *range )
{
    unsigned char *eptr = ziplistIndex (zl, 0), *sptr;
    double score;

    /* If everything is out of range, return early. */
    if ( ! zzlIsInRange (zl, range) ) return NULL;

    while ( eptr != NULL )
    {
        sptr = ziplistNext (zl, eptr);

        score = zzlGetScore (sptr);
        if ( zslValueGteMin (score, range) )
        {
            /* Check if score <= max. */
            if ( zslValueLteMax (score, range) )
                return eptr;
            return NULL;
        }

        /* Move to next element. */
        eptr = ziplistNext (zl, sptr);
    }

    return NULL;
}

/* Find pointer to the last element contained in the specified range.
 * Returns NULL when no element is contained in the range. */
unsigned char *zzlLastInRange ( unsigned char *zl, zrangespec *range )
{
    unsigned char *eptr = ziplistIndex (zl, - 2), *sptr;
    double score;

    /* If everything is out of range, return early. */
    if ( ! zzlIsInRange (zl, range) ) return NULL;

    while ( eptr != NULL )
    {
        sptr = ziplistNext (zl, eptr);

        score = zzlGetScore (sptr);
        if ( zslValueLteMax (score, range) )
        {
            /* Check if score >= min. */
            if ( zslValueGteMin (score, range) )
                return eptr;
            return NULL;
        }

        /* Move to previous element by moving to the score of previous element.
         * When this returns NULL, we know there also is no element. */
        sptr = ziplistPrev (zl, eptr);
        if ( sptr != NULL )
            eptr = ziplistPrev (zl, sptr);
        else
            eptr = NULL;
    }

    return NULL;
}

static int zzlLexValueGteMin ( unsigned char *p, zlexrangespec *spec )
{
    robj *value = ziplistGetObject (p);
    int res = zslLexValueGteMin (value, spec);
    decrRefCount (value);
    return res;
}

static int zzlLexValueLteMax ( unsigned char *p, zlexrangespec *spec )
{
    robj *value = ziplistGetObject (p);
    int res = zslLexValueLteMax (value, spec);
    decrRefCount (value);
    return res;
}

/* Returns if there is a part of the zset is in range. Should only be used
 * internally by zzlFirstInRange and zzlLastInRange. */
int zzlIsInLexRange ( unsigned char *zl, zlexrangespec *range )
{
    unsigned char *p;

    /* Test for ranges that will always be empty. */
    if ( compareStringObjectsForLexRange (range->min, range->max) > 1 ||
         ( compareStringObjects (range->min, range->max) == 0 &&
           ( range->minex || range->maxex ) ) )
        return 0;

    p = ziplistIndex (zl, - 2); /* Last element. */
    if ( p == NULL ) return 0;
    if ( ! zzlLexValueGteMin (p, range) )
        return 0;

    p = ziplistIndex (zl, 0); /* First element. */
    if ( ! zzlLexValueLteMax (p, range) )
        return 0;

    return 1;
}

/* Find pointer to the first element contained in the specified lex range.
 * Returns NULL when no element is contained in the range. */
unsigned char *zzlFirstInLexRange ( unsigned char *zl, zlexrangespec *range )
{
    unsigned char *eptr = ziplistIndex (zl, 0), *sptr;

    /* If everything is out of range, return early. */
    if ( ! zzlIsInLexRange (zl, range) ) return NULL;

    while ( eptr != NULL )
    {
        if ( zzlLexValueGteMin (eptr, range) )
        {
            /* Check if score <= max. */
            if ( zzlLexValueLteMax (eptr, range) )
                return eptr;
            return NULL;
        }

        /* Move to next element. */
        sptr = ziplistNext (zl, eptr); /* This element score. Skip it. */
        eptr = ziplistNext (zl, sptr); /* Next element. */
    }

    return NULL;
}

/* Find pointer to the last element contained in the specified lex range.
 * Returns NULL when no element is contained in the range. */
unsigned char *zzlLastInLexRange ( unsigned char *zl, zlexrangespec *range )
{
    unsigned char *eptr = ziplistIndex (zl, - 2), *sptr;

    /* If everything is out of range, return early. */
    if ( ! zzlIsInLexRange (zl, range) ) return NULL;

    while ( eptr != NULL )
    {
        if ( zzlLexValueLteMax (eptr, range) )
        {
            /* Check if score >= min. */
            if ( zzlLexValueGteMin (eptr, range) )
                return eptr;
            return NULL;
        }

        /* Move to previous element by moving to the score of previous element.
         * When this returns NULL, we know there also is no element. */
        sptr = ziplistPrev (zl, eptr);
        if ( sptr != NULL )
            eptr = ziplistPrev (zl, sptr);
        else
            eptr = NULL;
    }

    return NULL;
}

unsigned char *zzlFind ( unsigned char *zl, robj *ele, double *score )
{
    unsigned char *eptr = ziplistIndex (zl, 0), *sptr;

    ele = getDecodedObject (ele);
    while ( eptr != NULL )
    {
        sptr = ziplistNext (zl, eptr);

        if ( ziplistCompare (eptr, ele->ptr, sdslen (ele->ptr)) )
        {
            /* Matching element, pull out score. */
            if ( score != NULL ) *score = zzlGetScore (sptr);
            decrRefCount (ele);
            return eptr;
        }

        /* Move to next element. */
        eptr = ziplistNext (zl, sptr);
    }

    decrRefCount (ele);
    return NULL;
}

/* Delete (element,score) pair from ziplist. Use local copy of eptr because we
 * don't want to modify the one given as argument. */
unsigned char *zzlDelete ( unsigned char *zl, unsigned char *eptr )
{
    unsigned char *p = eptr;

    /* TODO: add function to ziplist API to delete N elements from offset. */
    zl = ziplistDelete (zl, &p);
    zl = ziplistDelete (zl, &p);
    return zl;
}

unsigned char *zzlInsertAt ( unsigned char *zl, unsigned char *eptr, robj *ele, double score )
{
    unsigned char *sptr;
    char scorebuf[128];
    int scorelen;
    size_t offset;

    scorelen = d2string (scorebuf, sizeof (scorebuf ), score);
    if ( eptr == NULL )
    {
        zl = ziplistPush (zl, ele->ptr, sdslen (ele->ptr), ZIPLIST_TAIL);
        zl = ziplistPush (zl, ( unsigned char* ) scorebuf, scorelen, ZIPLIST_TAIL);
    }
    else
    {
        /* Keep offset relative to zl, as it might be re-allocated. */
        offset = eptr - zl;
        zl = ziplistInsert (zl, eptr, ele->ptr, sdslen (ele->ptr));
        eptr = zl + offset;

        /* Insert score after the element. */
        sptr = ziplistNext (zl, eptr);
        zl = ziplistInsert (zl, sptr, ( unsigned char* ) scorebuf, scorelen);
    }

    return zl;
}

/* Insert (element,score) pair in ziplist. This function assumes the element is
 * not yet present in the list. */
unsigned char *zzlInsert ( unsigned char *zl, robj *ele, double score )
{
    unsigned char *eptr = ziplistIndex (zl, 0), *sptr;
    double s;

    ele = getDecodedObject (ele);
    while ( eptr != NULL )
    {
        sptr = ziplistNext (zl, eptr);
        s = zzlGetScore (sptr);

        if ( s > score )
        {
            /* First element with score larger than score for element to be
             * inserted. This means we should take its spot in the list to
             * maintain ordering. */
            zl = zzlInsertAt (zl, eptr, ele, score);
            break;
        }
        else if ( s == score )
        {
            /* Ensure lexicographical ordering for elements. */
            if ( zzlCompareElements (eptr, ele->ptr, sdslen (ele->ptr)) > 0 )
            {
                zl = zzlInsertAt (zl, eptr, ele, score);
                break;
            }
        }

        /* Move to next element. */
        eptr = ziplistNext (zl, sptr);
    }

    /* Push on tail of list when it was not yet inserted. */
    if ( eptr == NULL )
        zl = zzlInsertAt (zl, NULL, ele, score);

    decrRefCount (ele);
    return zl;
}

unsigned char *zzlDeleteRangeByScore ( unsigned char *zl, zrangespec *range, unsigned long *deleted )
{
    unsigned char *eptr, *sptr;
    double score;
    unsigned long num = 0;

    if ( deleted != NULL ) *deleted = 0;

    eptr = zzlFirstInRange (zl, range);
    if ( eptr == NULL ) return zl;

    /* When the tail of the ziplist is deleted, eptr will point to the sentinel
     * byte and ziplistNext will return NULL. */
    while ( ( sptr = ziplistNext (zl, eptr) ) != NULL )
    {
        score = zzlGetScore (sptr);
        if ( zslValueLteMax (score, range) )
        {
            /* Delete both the element and the score. */
            zl = ziplistDelete (zl, &eptr);
            zl = ziplistDelete (zl, &eptr);
            num ++;
        }
        else
        {
            /* No longer in range. */
            break;
        }
    }

    if ( deleted != NULL ) *deleted = num;
    return zl;
}

unsigned char *zzlDeleteRangeByLex ( unsigned char *zl, zlexrangespec *range, unsigned long *deleted )
{
    unsigned char *eptr, *sptr;
    unsigned long num = 0;

    if ( deleted != NULL ) *deleted = 0;

    eptr = zzlFirstInLexRange (zl, range);
    if ( eptr == NULL ) return zl;

    /* When the tail of the ziplist is deleted, eptr will point to the sentinel
     * byte and ziplistNext will return NULL. */
    while ( ( sptr = ziplistNext (zl, eptr) ) != NULL )
    {
        if ( zzlLexValueLteMax (eptr, range) )
        {
            /* Delete both the element and the score. */
            zl = ziplistDelete (zl, &eptr);
            zl = ziplistDelete (zl, &eptr);
            num ++;
        }
        else
        {
            /* No longer in range. */
            break;
        }
    }

    if ( deleted != NULL ) *deleted = num;
    return zl;
}

/* Delete all the elements with rank between start and end from the skiplist.
 * Start and end are inclusive. Note that start and end need to be 1-based */
unsigned char *zzlDeleteRangeByRank ( unsigned char *zl, unsigned int start, unsigned int end, unsigned long *deleted )
{
    unsigned int num = ( end - start ) + 1;
    if ( deleted ) *deleted = num;
    zl = ziplistDeleteRange (zl, 2 * ( start - 1 ), 2 * num);
    return zl;
}

/*-----------------------------------------------------------------------------
 * Common sorted set API
 *----------------------------------------------------------------------------*/

unsigned int zsetLength ( robj *zobj )
{
    int length = - 1;
    if ( zobj->encoding == OBJ_ENCODING_ZIPLIST )
    {
        length = zzlLength (zobj->ptr);
    }
    else if ( zobj->encoding == OBJ_ENCODING_SKIPLIST )
    {
        length = ( ( zset* ) zobj->ptr )->zsl->length;
    }
    else
    {
        return 0;
    }
    return length;
}

void zsetConvert ( robj *zobj, int encoding )
{
    zset *zs;
    zskiplistNode *node, *next;
    robj *ele;
    double score;

    if ( zobj->encoding == encoding ) return;
    if ( zobj->encoding == OBJ_ENCODING_ZIPLIST )
    {
        unsigned char *zl = zobj->ptr;
        unsigned char *eptr, *sptr;
        unsigned char *vstr;
        unsigned int vlen;
        long long vlong;

        if ( encoding != OBJ_ENCODING_SKIPLIST )
            return;

        zs = zmalloc (sizeof (*zs ));
        zs->dict = dictCreate (&zsetDictType, NULL);
        zs->zsl = zslCreate ();

        eptr = ziplistIndex (zl, 0);
        sptr = ziplistNext (zl, eptr);

        while ( eptr != NULL )
        {
            score = zzlGetScore (sptr);
            ziplistGet (eptr, &vstr, &vlen, &vlong);
            if ( vstr == NULL )
                ele = createStringObjectFromLongLong (vlong);
            else
                ele = createStringObject (( char* ) vstr, vlen);

            /* Has incremented refcount since it was just created. */
            node = zslInsert (zs->zsl, score, ele);
            dictAdd (zs->dict, ele, &node->score);
            incrRefCount (ele); /* Added to dictionary. */
            zzlNext (zl, &eptr, &sptr);
        }

        zfree (zobj->ptr);
        zobj->ptr = zs;
        zobj->encoding = OBJ_ENCODING_SKIPLIST;
    }
    else if ( zobj->encoding == OBJ_ENCODING_SKIPLIST )
    {
        unsigned char *zl = ziplistNew ();

        if ( encoding != OBJ_ENCODING_ZIPLIST )
            return;

        /* Approach similar to zslFree(), since we want to free the skiplist at
         * the same time as creating the ziplist. */
        zs = zobj->ptr;
        dictRelease (zs->dict);
        node = zs->zsl->header->level[0].forward;
        zfree (zs->zsl->header);
        zfree (zs->zsl);

        while ( node )
        {
            ele = getDecodedObject (node->obj);
            zl = zzlInsertAt (zl, NULL, ele, node->score);
            decrRefCount (ele);

            next = node->level[0].forward;
            zslFreeNode (node);
            node = next;
        }

        zfree (zs);
        zobj->ptr = zl;
        zobj->encoding = OBJ_ENCODING_ZIPLIST;
    }
    else
    {
        return;
    }
}

/* Convert the sorted set object into a ziplist if it is not already a ziplist
 * and if the number of elements and the maximum element size is within the
 * expected ranges. */
void zsetConvertToZiplistIfNeeded ( robj *zobj, size_t maxelelen )
{
    if ( zobj->encoding == OBJ_ENCODING_ZIPLIST ) return;
    zset *zset = zobj->ptr;

    if ( zset->zsl->length <= server.zset_max_ziplist_entries &&
         maxelelen <= server.zset_max_ziplist_value )
        zsetConvert (zobj, OBJ_ENCODING_ZIPLIST);
}

/* Return (by reference) the score of the specified member of the sorted set
 * storing it into *score. If the element does not exist C_ERR is returned
 * otherwise C_OK is returned and *score is correctly populated.
 * If 'zobj' or 'member' is NULL, C_ERR is returned. */
int zsetScore ( robj *zobj, robj *member, double *score )
{
    if ( ! zobj || ! member ) return C_ERR;

    if ( zobj->encoding == OBJ_ENCODING_ZIPLIST )
    {
        if ( zzlFind (zobj->ptr, member, score) == NULL ) return C_ERR;
    }
    else if ( zobj->encoding == OBJ_ENCODING_SKIPLIST )
    {
        zset *zs = zobj->ptr;
        dictEntry *de = dictFind (zs->dict, member);
        if ( de == NULL ) return C_ERR;
        *score = * ( double* ) dictGetVal (de);
    }
    else
    {
        return C_ERR;
    }
    return C_OK;
}

/*-----------------------------------------------------------------------------
 * Sorted set commands
 *----------------------------------------------------------------------------*/

/* This generic command implements both ZADD and ZINCRBY. */
#define ZADD_NONE 0
#define ZADD_INCR (1<<0)    /* Increment the score instead of setting it. */
#define ZADD_NX (1<<1)      /* Don't touch elements not already existing. */
#define ZADD_XX (1<<2)      /* Only touch elements already exisitng. */
#define ZADD_CH (1<<3)      /* Return num of elements added or updated. */

int zaddGenericCommand ( client *c, int flags )
{
    robj *key = c->argv[1];
    robj *ele;
    robj *zobj;
    robj *curobj;
    double score = 0, *scores = NULL, curscore = 0.0;
    int j, elements;
    int scoreidx = 0;
    /* The following vars are used in order to track what the command actually
     * did during the execution, to reply to the client and to trigger the
     * notification of keyspace change. */
    int added = 0; /* Number of new elements added. */
    int updated = 0; /* Number of elements with updated score. */
    int processed = 0; /* Number of elements processed, may remain zero with
                           options like XX. */

    /* Parse options. At the end 'scoreidx' is set to the argument position
     * of the score of the first score-element pair. */
    scoreidx = 2;
    while ( scoreidx < c->argc )
    {
        char *opt = c->argv[scoreidx]->ptr;
        if ( ! strcasecmp (opt, "nx") ) flags |= ZADD_NX;
        else if ( ! strcasecmp (opt, "xx") ) flags |= ZADD_XX;
        else if ( ! strcasecmp (opt, "ch") ) flags |= ZADD_CH;
        else if ( ! strcasecmp (opt, "incr") ) flags |= ZADD_INCR;
        else break;
        scoreidx ++;
    }

    /* Turn options into simple to check vars. */
    int incr = ( flags & ZADD_INCR ) != 0;
    int nx = ( flags & ZADD_NX ) != 0;
    int xx = ( flags & ZADD_XX ) != 0;
    int ch = ( flags & ZADD_CH ) != 0;

    /* After the options, we expect to have an even number of args, since
     * we expect any number of score-element pairs. */
    elements = c->argc - scoreidx;
    if ( elements % 2 )
    {
        return C_ERR;
    }
    elements /= 2; /* Now this holds the number of score-element pairs. */

    /* Check for incompatible options. */
    if ( nx && xx )
    {
        return C_ERR;
    }

    if ( incr && elements > 1 )
    {
        return C_ERR;
    }

    /* Start parsing all the scores, we need to emit any syntax error
     * before executing additions to the sorted set, as the command should
     * either execute fully or nothing at all. */
    scores = zmalloc (sizeof (double )*elements);
    for ( j = 0; j < elements; j ++ )
    {
        if ( getDoubleFromObject (c->argv[scoreidx + j * 2], &scores[j])
             != C_OK ) goto cleanup;
    }

    /* Lookup the key and create the sorted set if does not exist. */
    zobj = lookupKeyWrite (c->db, key);
    if ( zobj == NULL )
    {
        if ( xx ) goto reply_to_client; /* No key + XX option: nothing to do. */
        if ( server.zset_max_ziplist_entries == 0 ||
             server.zset_max_ziplist_value < sdslen (c->argv[scoreidx + 1]->ptr) )
        {
            zobj = createZsetObject ();
        }
        else
        {
            zobj = createZsetZiplistObject ();
        }
        dbAdd (c->db, key, zobj);
    }
    else
    {
        if ( zobj->type != OBJ_ZSET )
        {
            goto cleanup;
        }
    }

    for ( j = 0; j < elements; j ++ )
    {
        score = scores[j];

        if ( zobj->encoding == OBJ_ENCODING_ZIPLIST )
        {
            unsigned char *eptr;

            /* Prefer non-encoded element when dealing with ziplists. */
            ele = c->argv[scoreidx + 1 + j * 2];
            if ( ( eptr = zzlFind (zobj->ptr, ele, &curscore) ) != NULL )
            {
                if ( nx ) continue;
                if ( incr )
                {
                    score += curscore;
                }

                /* Remove and re-insert when score changed. */
                if ( score != curscore )
                {
                    zobj->ptr = zzlDelete (zobj->ptr, eptr);
                    zobj->ptr = zzlInsert (zobj->ptr, ele, score);
                    updated ++;
                }
                processed ++;
            }
            else if ( ! xx )
            {
                /* Optimize: check if the element is too large or the list
                 * becomes too long *before* executing zzlInsert. */
                zobj->ptr = zzlInsert (zobj->ptr, ele, score);
                if ( zzlLength (zobj->ptr) > server.zset_max_ziplist_entries )
                    zsetConvert (zobj, OBJ_ENCODING_SKIPLIST);
                if ( sdslen (ele->ptr) > server.zset_max_ziplist_value )
                    zsetConvert (zobj, OBJ_ENCODING_SKIPLIST);
                added ++;
                processed ++;
            }
        }
        else if ( zobj->encoding == OBJ_ENCODING_SKIPLIST )
        {
            zset *zs = zobj->ptr;
            zskiplistNode *znode;
            dictEntry *de;

            ele = c->argv[scoreidx + 1 + j * 2] =
                tryObjectEncoding (c->argv[scoreidx + 1 + j * 2]);
            de = dictFind (zs->dict, ele);
            if ( de != NULL )
            {
                if ( nx ) continue;
                curobj = dictGetKey (de);
                curscore = * ( double* ) dictGetVal (de);

                if ( incr )
                {
                    score += curscore;
                }

                /* Remove and re-insert when score changed. We can safely
                 * delete the key object from the skiplist, since the
                 * dictionary still has a reference to it. */
                if ( score != curscore )
                {
                    zslDelete (zs->zsl, curscore, curobj);
                    znode = zslInsert (zs->zsl, score, curobj);
                    incrRefCount (curobj); /* Re-inserted in skiplist. */
                    dictGetVal (de) = & znode->score; /* Update score ptr. */
                    updated ++;
                }
                processed ++;
            }
            else if ( ! xx )
            {
                znode = zslInsert (zs->zsl, score, ele);
                incrRefCount (ele); /* Inserted in skiplist. */
                dictAdd (zs->dict, ele, &znode->score);
                incrRefCount (ele); /* Added to dictionary. */
                added ++;
                processed ++;
            }
        }
        else
        {
            return C_ERR;
        }
    }

reply_to_client:
    if ( incr )
    { /* ZINCRBY or INCR option. */
        if ( processed )
            addReplyDouble (c, score);
    }
    else
    { /* ZADD. */
        addReplyLongLong (c, ch ? added + updated : added);
    }

cleanup:
    zfree (scores);
    return C_OK;
}

int zaddCommand ( client *c )
{
    return zaddGenericCommand (c, ZADD_NONE);
}

int zincrbyCommand ( client *c )
{
    return zaddGenericCommand (c, ZADD_INCR);
}

int zremCommand ( client *c )
{
    robj *key = c->argv[1];
    robj *zobj;
    int deleted = 0, keyremoved = 0, j;

    if ( ( zobj = lookupKeyWrite (c->db, key) ) == NULL ||
         checkType (c, zobj, OBJ_ZSET) ) return C_ERR;

    if ( zobj->encoding == OBJ_ENCODING_ZIPLIST )
    {
        unsigned char *eptr;

        for ( j = 2; j < c->argc; j ++ )
        {
            if ( ( eptr = zzlFind (zobj->ptr, c->argv[j], NULL) ) != NULL )
            {
                deleted ++;
                zobj->ptr = zzlDelete (zobj->ptr, eptr);
                if ( zzlLength (zobj->ptr) == 0 )
                {
                    dbDelete (c->db, key);
                    keyremoved = 1;
                    break;
                }
            }
        }
    }
    else if ( zobj->encoding == OBJ_ENCODING_SKIPLIST )
    {
        zset *zs = zobj->ptr;
        dictEntry *de;
        double score;

        for ( j = 2; j < c->argc; j ++ )
        {
            de = dictFind (zs->dict, c->argv[j]);
            if ( de != NULL )
            {
                deleted ++;

                /* Delete from the skiplist */
                score = * ( double* ) dictGetVal (de);
                zslDelete (zs->zsl, score, c->argv[j]);

                /* Delete from the hash table */
                dictDelete (zs->dict, c->argv[j]);
                if ( htNeedsResize (zs->dict) ) dictResize (zs->dict);
                if ( dictSize (zs->dict) == 0 )
                {
                    dbDelete (c->db, key);
                    keyremoved = 1;
                    break;
                }
            }
        }
    }
    else
    {
        return C_ERR;
    }

    addReplyLongLong (c, deleted);
    return C_OK;
}

/* Implements ZREMRANGEBYRANK, ZREMRANGEBYSCORE, ZREMRANGEBYLEX commands. */
#define ZRANGE_RANK 0
#define ZRANGE_SCORE 1
#define ZRANGE_LEX 2

int zremrangeGenericCommand ( client *c, int rangetype )
{
    robj *key = c->argv[1];
    robj *zobj;
    int keyremoved = 0;
    unsigned long deleted = 0;
    zrangespec range;
    zlexrangespec lexrange;
    long start, end, llen;

    /* Step 1: Parse the range. */
    if ( rangetype == ZRANGE_RANK )
    {
        if ( ( getLongFromObject (c->argv[2], &start) != C_OK ) ||
             ( getLongFromObject (c->argv[3], &end) != C_OK ) )
            return C_ERR;
    }
    else if ( rangetype == ZRANGE_SCORE )
    {
        if ( zslParseRange (c->argv[2], c->argv[3], &range) != C_OK )
        {
            return C_ERR;
        }
    }
    else if ( rangetype == ZRANGE_LEX )
    {
        if ( zslParseLexRange (c->argv[2], c->argv[3], &lexrange) != C_OK )
        {
            return C_ERR;
        }
    }

    /* Step 2: Lookup & range sanity checks if needed. */
    if ( ( zobj = lookupKeyWrite (c->db, key) ) == NULL ||
         checkType (c, zobj, OBJ_ZSET) ) goto cleanup;

    if ( rangetype == ZRANGE_RANK )
    {
        /* Sanitize indexes. */
        llen = zsetLength (zobj);
        if ( start < 0 ) start = llen + start;
        if ( end < 0 ) end = llen + end;
        if ( start < 0 ) start = 0;

        /* Invariant: start >= 0, so this test will be true when end < 0.
         * The range is empty when start > end or start >= length. */
        if ( start > end || start >= llen )
        {
            goto cleanup;
        }
        if ( end >= llen ) end = llen - 1;
    }

    /* Step 3: Perform the range deletion operation. */
    if ( zobj->encoding == OBJ_ENCODING_ZIPLIST )
    {
        switch ( rangetype )
        {
            case ZRANGE_RANK:
                zobj->ptr = zzlDeleteRangeByRank (zobj->ptr, start + 1, end + 1, &deleted);
                break;
            case ZRANGE_SCORE:
                zobj->ptr = zzlDeleteRangeByScore (zobj->ptr, &range, &deleted);
                break;
            case ZRANGE_LEX:
                zobj->ptr = zzlDeleteRangeByLex (zobj->ptr, &lexrange, &deleted);
                break;
        }
        if ( zzlLength (zobj->ptr) == 0 )
        {
            dbDelete (c->db, key);
            keyremoved = 1;
        }
    }
    else if ( zobj->encoding == OBJ_ENCODING_SKIPLIST )
    {
        zset *zs = zobj->ptr;
        switch ( rangetype )
        {
            case ZRANGE_RANK:
                deleted = zslDeleteRangeByRank (zs->zsl, start + 1, end + 1, zs->dict);
                break;
            case ZRANGE_SCORE:
                deleted = zslDeleteRangeByScore (zs->zsl, &range, zs->dict);
                break;
            case ZRANGE_LEX:
                deleted = zslDeleteRangeByLex (zs->zsl, &lexrange, zs->dict);
                break;
        }
        if ( htNeedsResize (zs->dict) ) dictResize (zs->dict);
        if ( dictSize (zs->dict) == 0 )
        {
            dbDelete (c->db, key);
            keyremoved = 1;
        }
    }
    else
    {
        return C_ERR;
    }

    /* Step 4: Notifications and reply. */

    addReplyLongLong (c, deleted);

cleanup:
    if ( rangetype == ZRANGE_LEX ) zslFreeLexRange (&lexrange);
    return C_OK;
}

int zremrangebyrankCommand ( client *c )
{
    return zremrangeGenericCommand (c, ZRANGE_RANK);
}

int zremrangebyscoreCommand ( client *c )
{
    return zremrangeGenericCommand (c, ZRANGE_SCORE);
}

int zremrangebylexCommand ( client *c )
{
    return zremrangeGenericCommand (c, ZRANGE_LEX);
}

typedef struct
{
    robj *subject;
    int type; /* Set, sorted set */
    int encoding;
    double weight;

    union
    {

        /* Set iterators. */
        union _iterset
        {

            struct
            {
                intset *is;
                int ii;
            } is;

            struct
            {
                dict *dict;
                dictIterator *di;
                dictEntry *de;
            } ht;
        } set;

        /* Sorted set iterators. */
        union _iterzset
        {

            struct
            {
                unsigned char *zl;
                unsigned char *eptr, *sptr;
            } zl;

            struct
            {
                zset *zs;
                zskiplistNode *node;
            } sl;
        } zset;
    } iter;
} zsetopsrc;


/* Use dirty flags for pointers that need to be cleaned up in the next
 * iteration over the zsetopval. The dirty flag for the long long value is
 * special, since long long values don't need cleanup. Instead, it means that
 * we already checked that "ell" holds a long long, or tried to convert another
 * representation into a long long value. When this was successful,
 * OPVAL_VALID_LL is set as well. */
#define OPVAL_DIRTY_ROBJ 1
#define OPVAL_DIRTY_LL 2
#define OPVAL_VALID_LL 4

/* Store value retrieved from the iterator. */
typedef struct
{
    int flags;
    unsigned char _buf[32]; /* Private buffer. */
    robj *ele;
    unsigned char *estr;
    unsigned int elen;
    long long ell;
    double score;
} zsetopval;

typedef union _iterset iterset;
typedef union _iterzset iterzset;

void zuiInitIterator ( zsetopsrc *op )
{
    if ( op->subject == NULL )
        return;

    if ( op->type == OBJ_SET )
    {
        iterset *it = & op->iter.set;
        if ( op->encoding == OBJ_ENCODING_INTSET )
        {
            it->is.is = op->subject->ptr;
            it->is.ii = 0;
        }
        else if ( op->encoding == OBJ_ENCODING_HT )
        {
            it->ht.dict = op->subject->ptr;
            it->ht.di = dictGetIterator (op->subject->ptr);
            it->ht.de = dictNext (it->ht.di);
        }
        else
        {
            return;
        }
    }
    else if ( op->type == OBJ_ZSET )
    {
        iterzset *it = & op->iter.zset;
        if ( op->encoding == OBJ_ENCODING_ZIPLIST )
        {
            it->zl.zl = op->subject->ptr;
            it->zl.eptr = ziplistIndex (it->zl.zl, 0);
            if ( it->zl.eptr != NULL )
            {
                it->zl.sptr = ziplistNext (it->zl.zl, it->zl.eptr);
            }
        }
        else if ( op->encoding == OBJ_ENCODING_SKIPLIST )
        {
            it->sl.zs = op->subject->ptr;
            it->sl.node = it->sl.zs->zsl->header->level[0].forward;
        }
        else
        {
            return;
        }
    }
    else
    {
        return;
    }
}

void zuiClearIterator ( zsetopsrc *op )
{
    if ( op->subject == NULL )
        return;

    if ( op->type == OBJ_SET )
    {
        iterset *it = & op->iter.set;
        if ( op->encoding == OBJ_ENCODING_INTSET )
        {
            UNUSED (it); /* skip */
        }
        else if ( op->encoding == OBJ_ENCODING_HT )
        {
            dictReleaseIterator (it->ht.di);
        }
        else
        {
            return;
        }
    }
    else if ( op->type == OBJ_ZSET )
    {
        iterzset *it = & op->iter.zset;
        if ( op->encoding == OBJ_ENCODING_ZIPLIST )
        {
            UNUSED (it); /* skip */
        }
        else if ( op->encoding == OBJ_ENCODING_SKIPLIST )
        {
            UNUSED (it); /* skip */
        }
        else
        {
            return;
        }
    }
    else
    {
        return;
    }
}

int zuiLength ( zsetopsrc *op )
{
    if ( op->subject == NULL )
        return 0;

    if ( op->type == OBJ_SET )
    {
        if ( op->encoding == OBJ_ENCODING_INTSET )
        {
            return intsetLen (op->subject->ptr);
        }
        else if ( op->encoding == OBJ_ENCODING_HT )
        {
            dict *ht = op->subject->ptr;
            return dictSize (ht);
        }
        else
        {
            return C_ERR;
        }
    }
    else if ( op->type == OBJ_ZSET )
    {
        if ( op->encoding == OBJ_ENCODING_ZIPLIST )
        {
            return zzlLength (op->subject->ptr);
        }
        else if ( op->encoding == OBJ_ENCODING_SKIPLIST )
        {
            zset *zs = op->subject->ptr;
            return zs->zsl->length;
        }
        else
        {
            return C_ERR;
        }
    }
    else
    {
        return C_ERR;
    }
}

/* Check if the current value is valid. If so, store it in the passed structure
 * and move to the next element. If not valid, this means we have reached the
 * end of the structure and can abort. */
int zuiNext ( zsetopsrc *op, zsetopval *val )
{
    if ( op->subject == NULL )
        return 0;

    if ( val->flags & OPVAL_DIRTY_ROBJ )
        decrRefCount (val->ele);

    memset (val, 0, sizeof (zsetopval ));

    if ( op->type == OBJ_SET )
    {
        iterset *it = & op->iter.set;
        if ( op->encoding == OBJ_ENCODING_INTSET )
        {
            int64_t ell;

            if ( ! intsetGet (it->is.is, it->is.ii, &ell) )
                return 0;
            val->ell = ell;
            val->score = 1.0;

            /* Move to next element. */
            it->is.ii ++;
        }
        else if ( op->encoding == OBJ_ENCODING_HT )
        {
            if ( it->ht.de == NULL )
                return 0;
            val->ele = dictGetKey (it->ht.de);
            val->score = 1.0;

            /* Move to next element. */
            it->ht.de = dictNext (it->ht.di);
        }
        else
        {
            return 0;
        }
    }
    else if ( op->type == OBJ_ZSET )
    {
        iterzset *it = & op->iter.zset;
        if ( op->encoding == OBJ_ENCODING_ZIPLIST )
        {
            /* No need to check both, but better be explicit. */
            if ( it->zl.eptr == NULL || it->zl.sptr == NULL )
                return 0;
            ziplistGet (it->zl.eptr, &val->estr, &val->elen, &val->ell);
            val->score = zzlGetScore (it->zl.sptr);

            /* Move to next element. */
            zzlNext (it->zl.zl, &it->zl.eptr, &it->zl.sptr);
        }
        else if ( op->encoding == OBJ_ENCODING_SKIPLIST )
        {
            if ( it->sl.node == NULL )
                return 0;
            val->ele = it->sl.node->obj;
            val->score = it->sl.node->score;

            /* Move to next element. */
            it->sl.node = it->sl.node->level[0].forward;
        }
        else
        {
            return 0;
        }
    }
    else
    {
        return 0;
    }
    return 1;
}

int zuiLongLongFromValue ( zsetopval *val )
{
    if ( ! ( val->flags & OPVAL_DIRTY_LL ) )
    {
        val->flags |= OPVAL_DIRTY_LL;

        if ( val->ele != NULL )
        {
            if ( val->ele->encoding == OBJ_ENCODING_INT )
            {
                val->ell = ( long ) val->ele->ptr;
                val->flags |= OPVAL_VALID_LL;
            }
            else if ( sdsEncodedObject (val->ele) )
            {
                if ( string2ll (val->ele->ptr, sdslen (val->ele->ptr), &val->ell) )
                    val->flags |= OPVAL_VALID_LL;
            }
            else
            {
                return C_ERR;
            }
        }
        else if ( val->estr != NULL )
        {
            if ( string2ll (( char* ) val->estr, val->elen, &val->ell) )
                val->flags |= OPVAL_VALID_LL;
        }
        else
        {
            /* The long long was already set, flag as valid. */
            val->flags |= OPVAL_VALID_LL;
        }
    }
    return val->flags & OPVAL_VALID_LL;
}

robj *zuiObjectFromValue ( zsetopval *val )
{
    if ( val->ele == NULL )
    {
        if ( val->estr != NULL )
        {
            val->ele = createStringObject (( char* ) val->estr, val->elen);
        }
        else
        {
            val->ele = createStringObjectFromLongLong (val->ell);
        }
        val->flags |= OPVAL_DIRTY_ROBJ;
    }
    return val->ele;
}

int zuiBufferFromValue ( zsetopval *val )
{
    if ( val->estr == NULL )
    {
        if ( val->ele != NULL )
        {
            if ( val->ele->encoding == OBJ_ENCODING_INT )
            {
                val->elen = ll2string (( char* ) val->_buf, sizeof (val->_buf ), ( long ) val->ele->ptr);
                val->estr = val->_buf;
            }
            else if ( sdsEncodedObject (val->ele) )
            {
                val->elen = sdslen (val->ele->ptr);
                val->estr = val->ele->ptr;
            }
            else
            {
                return 0;
            }
        }
        else
        {
            val->elen = ll2string (( char* ) val->_buf, sizeof (val->_buf ), val->ell);
            val->estr = val->_buf;
        }
    }
    return 1;
}

/* Find value pointed to by val in the source pointer to by op. When found,
 * return 1 and store its score in target. Return 0 otherwise. */
int zuiFind ( zsetopsrc *op, zsetopval *val, double *score )
{
    if ( op->subject == NULL )
        return 0;

    if ( op->type == OBJ_SET )
    {
        if ( op->encoding == OBJ_ENCODING_INTSET )
        {
            if ( zuiLongLongFromValue (val) &&
                 intsetFind (op->subject->ptr, val->ell) )
            {
                *score = 1.0;
                return 1;
            }
            else
            {
                return 0;
            }
        }
        else if ( op->encoding == OBJ_ENCODING_HT )
        {
            dict *ht = op->subject->ptr;
            zuiObjectFromValue (val);
            if ( dictFind (ht, val->ele) != NULL )
            {
                *score = 1.0;
                return 1;
            }
            else
            {
                return 0;
            }
        }
        else
        {
            return 0;
        }
    }
    else if ( op->type == OBJ_ZSET )
    {
        zuiObjectFromValue (val);

        if ( op->encoding == OBJ_ENCODING_ZIPLIST )
        {
            if ( zzlFind (op->subject->ptr, val->ele, score) != NULL )
            {
                /* Score is already set by zzlFind. */
                return 1;
            }
            else
            {
                return 0;
            }
        }
        else if ( op->encoding == OBJ_ENCODING_SKIPLIST )
        {
            zset *zs = op->subject->ptr;
            dictEntry *de;
            if ( ( de = dictFind (zs->dict, val->ele) ) != NULL )
            {
                *score = * ( double* ) dictGetVal (de);
                return 1;
            }
            else
            {
                return 0;
            }
        }
        else
        {
            return 0;
        }
    }
    else
    {
        return 0;
    }
}

int zuiCompareByCardinality ( const void *s1, const void *s2 )
{
    return zuiLength (( zsetopsrc* ) s1) - zuiLength (( zsetopsrc* ) s2);
}

#define REDIS_AGGR_SUM 1
#define REDIS_AGGR_MIN 2
#define REDIS_AGGR_MAX 3
#define zunionInterDictValue(_e) (dictGetVal(_e) == NULL ? 1.0 : *(double*)dictGetVal(_e))

inline static void zunionInterAggregate ( double *target, double val, int aggregate )
{
    if ( aggregate == REDIS_AGGR_SUM )
    {
        *target = * target + val;
        /* The result of adding two doubles is NaN when one variable
         * is +inf and the other is -inf. When these numbers are added,
         * we maintain the convention of the result being 0.0. */
        if ( isnan (*target) ) *target = 0.0;
    }
    else if ( aggregate == REDIS_AGGR_MIN )
    {
        *target = val < * target ? val : *target;
    }
    else if ( aggregate == REDIS_AGGR_MAX )
    {
        *target = val > *target ? val : *target;
    }
    else
    {
        /* safety net */
        return;
    }
}

int zunionInterGenericCommand ( client *c, robj *dstkey, int op )
{
    int i, j;
    long setnum;
    int aggregate = REDIS_AGGR_SUM;
    zsetopsrc *src;
    zsetopval zval;
    robj *tmp;
    unsigned int maxelelen = 0;
    robj *dstobj;
    zset *dstzset;
    zskiplistNode *znode;

    /* expect setnum input keys to be given */
    if ( ( getLongFromObject (c->argv[2], &setnum) != C_OK ) )
        return C_ERR;

    if ( setnum < 1 )
    {
        return C_ERR;
    }

    /* test if the expected number of keys would overflow */
    if ( setnum > c->argc - 3 )
    {
        return C_ERR;
    }

    /* read keys to be used for input */
    src = zcalloc (sizeof (zsetopsrc ) * setnum);
    for ( i = 0, j = 3; i < setnum; i ++, j ++ )
    {
        robj *obj = lookupKeyWrite (c->db, c->argv[j]);
        if ( obj != NULL )
        {
            if ( obj->type != OBJ_ZSET && obj->type != OBJ_SET )
            {
                zfree (src);
                return C_ERR;
            }

            src[i].subject = obj;
            src[i].type = obj->type;
            src[i].encoding = obj->encoding;
        }
        else
        {
            src[i].subject = NULL;
        }

        /* Default all weights to 1. */
        src[i].weight = 1.0;
    }

    /* parse optional extra arguments */
    if ( j < c->argc )
    {
        int remaining = c->argc - j;

        while ( remaining )
        {
            if ( remaining >= ( setnum + 1 ) && ! strcasecmp (c->argv[j]->ptr, "weights") )
            {
                j ++;
                remaining --;
                for ( i = 0; i < setnum; i ++, j ++, remaining -- )
                {
                    if ( getDoubleFromObject (c->argv[j], &src[i].weight) != C_OK )
                    {
                        zfree (src);
                        return C_ERR;
                    }
                }
            }
            else if ( remaining >= 2 && ! strcasecmp (c->argv[j]->ptr, "aggregate") )
            {
                j ++;
                remaining --;
                if ( ! strcasecmp (c->argv[j]->ptr, "sum") )
                {
                    aggregate = REDIS_AGGR_SUM;
                }
                else if ( ! strcasecmp (c->argv[j]->ptr, "min") )
                {
                    aggregate = REDIS_AGGR_MIN;
                }
                else if ( ! strcasecmp (c->argv[j]->ptr, "max") )
                {
                    aggregate = REDIS_AGGR_MAX;
                }
                else
                {
                    zfree (src);
                    return C_ERR;
                }
                j ++;
                remaining --;
            }
            else
            {
                zfree (src);
                return C_ERR;
            }
        }
    }

    /* sort sets from the smallest to largest, this will improve our
     * algorithm's performance */
    qsort (src, setnum, sizeof (zsetopsrc ), zuiCompareByCardinality);

    dstobj = createZsetObject ();
    dstzset = dstobj->ptr;
    memset (&zval, 0, sizeof (zval ));

    if ( op == SET_OP_INTER )
    {
        /* Skip everything if the smallest input is empty. */
        if ( zuiLength (&src[0]) > 0 )
        {
            /* Precondition: as src[0] is non-empty and the inputs are ordered
             * by size, all src[i > 0] are non-empty too. */
            zuiInitIterator (&src[0]);
            while ( zuiNext (&src[0], &zval) )
            {
                double score, value;

                score = src[0].weight * zval.score;
                if ( isnan (score) ) score = 0;

                for ( j = 1; j < setnum; j ++ )
                {
                    /* It is not safe to access the zset we are
                     * iterating, so explicitly check for equal object. */
                    if ( src[j].subject == src[0].subject )
                    {
                        value = zval.score * src[j].weight;
                        zunionInterAggregate (&score, value, aggregate);
                    }
                    else if ( zuiFind (&src[j], &zval, &value) )
                    {
                        value *= src[j].weight;
                        zunionInterAggregate (&score, value, aggregate);
                    }
                    else
                    {
                        break;
                    }
                }

                /* Only continue when present in every input. */
                if ( j == setnum )
                {
                    tmp = zuiObjectFromValue (&zval);
                    znode = zslInsert (dstzset->zsl, score, tmp);
                    incrRefCount (tmp); /* added to skiplist */
                    dictAdd (dstzset->dict, tmp, &znode->score);
                    incrRefCount (tmp); /* added to dictionary */

                    if ( sdsEncodedObject (tmp) )
                    {
                        if ( sdslen (tmp->ptr) > maxelelen )
                            maxelelen = sdslen (tmp->ptr);
                    }
                }
            }
            zuiClearIterator (&src[0]);
        }
    }
    else if ( op == SET_OP_UNION )
    {
        dict *accumulator = dictCreate (&setDictType, NULL);
        dictIterator *di;
        dictEntry *de;
        double score;

        if ( setnum )
        {
            /* Our union is at least as large as the largest set.
             * Resize the dictionary ASAP to avoid useless rehashing. */
            dictExpand (accumulator, zuiLength (&src[setnum - 1]));
        }

        /* Step 1: Create a dictionary of elements -> aggregated-scores
         * by iterating one sorted set after the other. */
        for ( i = 0; i < setnum; i ++ )
        {
            if ( zuiLength (&src[i]) == 0 ) continue;

            zuiInitIterator (&src[i]);
            while ( zuiNext (&src[i], &zval) )
            {
                /* Initialize value */
                score = src[i].weight * zval.score;
                if ( isnan (score) ) score = 0;

                /* Search for this element in the accumulating dictionary. */
                de = dictFind (accumulator, zuiObjectFromValue (&zval));
                /* If we don't have it, we need to create a new entry. */
                if ( de == NULL )
                {
                    tmp = zuiObjectFromValue (&zval);
                    /* Remember the longest single element encountered,
                     * to understand if it's possible to convert to ziplist
                     * at the end. */
                    if ( sdsEncodedObject (tmp) )
                    {
                        if ( sdslen (tmp->ptr) > maxelelen )
                            maxelelen = sdslen (tmp->ptr);
                    }
                    /* Add the element with its initial score. */
                    de = dictAddRaw (accumulator, tmp);
                    incrRefCount (tmp);
                    dictSetDoubleVal (de, score);
                }
                else
                {
                    /* Update the score with the score of the new instance
                     * of the element found in the current sorted set.
                     *
                     * Here we access directly the dictEntry double
                     * value inside the union as it is a big speedup
                     * compared to using the getDouble/setDouble API. */
                    zunionInterAggregate (&de->v.d, score, aggregate);
                }
            }
            zuiClearIterator (&src[i]);
        }

        /* Step 2: convert the dictionary into the final sorted set. */
        di = dictGetIterator (accumulator);

        /* We now are aware of the final size of the resulting sorted set,
         * let's resize the dictionary embedded inside the sorted set to the
         * right size, in order to save rehashing time. */
        dictExpand (dstzset->dict, dictSize (accumulator));

        while ( ( de = dictNext (di) ) != NULL )
        {
            robj *ele = dictGetKey (de);
            score = dictGetDoubleVal (de);
            znode = zslInsert (dstzset->zsl, score, ele);
            incrRefCount (ele); /* added to skiplist */
            dictAdd (dstzset->dict, ele, &znode->score);
            incrRefCount (ele); /* added to dictionary */
        }
        dictReleaseIterator (di);

        /* We can free the accumulator dictionary now. */
        dictRelease (accumulator);
    }
    else
    {
        return C_ERR;
    }

    dbDelete (c->db, dstkey);

    if ( dstzset->zsl->length )
    {
        zsetConvertToZiplistIfNeeded (dstobj, maxelelen);
        dbAdd (c->db, dstkey, dstobj);
        addReplyLongLong (c, zsetLength (dstobj));
    }
    else
    {
        decrRefCount (dstobj);
        return C_ERR;
    }
    zfree (src);
    return C_OK;
}

int zunionstoreCommand ( client *c )
{
    return zunionInterGenericCommand (c, c->argv[1], SET_OP_UNION);
}

int zinterstoreCommand ( client *c )
{
    return zunionInterGenericCommand (c, c->argv[1], SET_OP_INTER);
}

int zrangeGenericCommand ( client *c, int reverse )
{
    robj *key = c->argv[1];
    robj *zobj;
    int withscores = 0;
    long start;
    long end;
    int llen;
    int rangelen;
    int type = 0;

    if ( ( getLongFromObject (c->argv[2], &start) != C_OK ) ||
         ( getLongFromObject (c->argv[3], &end) != C_OK ) ) return C_ERR;

    if ( c->argc == 5 && ! strcasecmp (c->argv[4]->ptr, "withscores") )
    {
        withscores = 1;
    }
    else if ( c->argc >= 5 )
    {
        return C_ERR;
    }

    if ( ( zobj = lookupKeyRead (c->db, key) ) == NULL
         || checkType (c, zobj, OBJ_ZSET) ) return C_ERR;

    /* Sanitize indexes. */
    llen = zsetLength (zobj);
    if ( start < 0 ) start = llen + start;
    if ( end < 0 ) end = llen + end;
    if ( start < 0 ) start = 0;

    /* Invariant: start >= 0, so this test will be true when end < 0.
     * The range is empty when start > end or start >= length. */
    if ( start > end || start >= llen )
    {
        return C_ERR;
    }
    if ( end >= llen ) end = llen - 1;
    rangelen = ( end - start ) + 1;

    /* Return the result in form of a multi-bulk reply */
    if ( withscores )
    {
        type = 1;
    }

    if ( zobj->encoding == OBJ_ENCODING_ZIPLIST )
    {
        unsigned char *zl = zobj->ptr;
        unsigned char *eptr, *sptr;
        unsigned char *vstr;
        unsigned int vlen;
        long long vlong;

        if ( reverse )
            eptr = ziplistIndex (zl, - 2 - ( 2 * start ));
        else
            eptr = ziplistIndex (zl, 2 * start);

        sptr = ziplistNext (zl, eptr);

        addReplyJsonHead (c);
        while ( rangelen -- )
        {
            ziplistGet (eptr, &vstr, &vlen, &vlong);
            /*
            if (vstr == NULL)
                addReplyLongLong(c,vlong);
            else
                addReplyString(c,(char *)vstr,vlen);

            if (withscores)
                addReplyDouble(c,zzlGetScore(sptr));
             */

            if ( vstr == NULL )
            {
                addReplyJsonLongLong (c, type, vlong, zzlGetScore (sptr));
            }
            else
            {
                addReplyJsonString (c, type, ( char * ) vstr, vlen, zzlGetScore (sptr));
            }
            addReplyJsonSep (c);

            if ( reverse )
                zzlPrev (zl, &eptr, &sptr);
            else
                zzlNext (zl, &eptr, &sptr);

            if ( outOfRange (c) )
            {
                break;
            }
        }
        addReplyJsonTail (c);

    }
    else if ( zobj->encoding == OBJ_ENCODING_SKIPLIST )
    {
        zset *zs = zobj->ptr;
        zskiplist *zsl = zs->zsl;
        zskiplistNode *ln;
        robj *ele;

        /* Check if starting point is trivial, before doing log(N) lookup. */
        if ( reverse )
        {
            ln = zsl->tail;
            if ( start > 0 )
                ln = zslGetElementByRank (zsl, llen - start);
        }
        else
        {
            ln = zsl->header->level[0].forward;
            if ( start > 0 )
                ln = zslGetElementByRank (zsl, start + 1);
        }
        addReplyJsonHead (c);
        while ( rangelen -- )
        {
            ele = ln->obj;
            /*
            addReply(c,ele);
            if (withscores)
                addReplyDouble(c,ln->score);
             */
            addReplyJson (c, type, ele, ln->score);
            addReplyJsonSep (c);
            ln = reverse ? ln->backward : ln->level[0].forward;

            if ( outOfRange (c) )
            {
                break;
            }
        }
        addReplyJsonTail (c);
    }
    else
    {
        return C_ERR;
    }
    return C_OK;
}

int zrangeCommand ( client *c )
{
    return zrangeGenericCommand (c, 0);
}

int zrevrangeCommand ( client *c )
{
    return zrangeGenericCommand (c, 1);
}

/* This command implements ZRANGEBYSCORE, ZREVRANGEBYSCORE. */
int genericZrangebyscoreCommand ( client *c, int reverse )
{
    zrangespec range;
    robj *key = c->argv[1];
    robj *zobj;
    long offset = 0, limit = - 1;
    int withscores = 0;
    unsigned long rangelen = 0;
    int minidx, maxidx;
    int type = 0;

    /* Parse the range arguments. */
    if ( reverse )
    {
        /* Range is given as [max,min] */
        maxidx = 2;
        minidx = 3;
    }
    else
    {
        /* Range is given as [min,max] */
        minidx = 2;
        maxidx = 3;
    }

    if ( zslParseRange (c->argv[minidx], c->argv[maxidx], &range) != C_OK )
    {
        return C_ERR;
    }

    /* Parse optional extra arguments. Note that ZCOUNT will exactly have
     * 4 arguments, so we'll never enter the following code path. */
    if ( c->argc > 4 )
    {
        int remaining = c->argc - 4;
        int pos = 4;

        while ( remaining )
        {
            if ( remaining >= 1 && ! strcasecmp (c->argv[pos]->ptr, "withscores") )
            {
                pos ++;
                remaining --;
                withscores = 1;
            }
            else if ( remaining >= 3 && ! strcasecmp (c->argv[pos]->ptr, "limit") )
            {
                if ( ( getLongFromObject (c->argv[pos + 1], &offset) != C_OK ) ||
                     ( getLongFromObject (c->argv[pos + 2], &limit) != C_OK ) ) return C_ERR;
                pos += 3;
                remaining -= 3;
            }
            else
            {
                return C_ERR;
            }
        }
    }

    /* Ok, lookup the key and get the range */
    if ( ( zobj = lookupKeyRead (c->db, key) ) == NULL ||
         checkType (c, zobj, OBJ_ZSET) ) return C_ERR;

    if ( withscores )
    {
        type = 1;
    }

    if ( zobj->encoding == OBJ_ENCODING_ZIPLIST )
    {
        unsigned char *zl = zobj->ptr;
        unsigned char *eptr, *sptr;
        unsigned char *vstr;
        unsigned int vlen;
        long long vlong;
        double score;

        /* If reversed, get the last node in range as starting point. */
        if ( reverse )
        {
            eptr = zzlLastInRange (zl, &range);
        }
        else
        {
            eptr = zzlFirstInRange (zl, &range);
        }

        /* No "first" element in the specified interval. */
        if ( eptr == NULL )
        {
            return C_ERR;
        }

        /* Get score pointer for the first element. */
        sptr = ziplistNext (zl, eptr);

        /* We don't know in advance how many matching elements there are in the
         * list, so we push this object that will represent the multi-bulk
         * length in the output buffer, and will "fix" it later */

        /* If there is an offset, just traverse the number of elements without
         * checking the score because that is done in the next loop. */
        addReplyJsonHead (c);
        while ( eptr && offset -- )
        {
            if ( reverse )
            {
                zzlPrev (zl, &eptr, &sptr);
            }
            else
            {
                zzlNext (zl, &eptr, &sptr);
            }
        }

        while ( eptr && limit -- )
        {
            score = zzlGetScore (sptr);

            /* Abort when the node is no longer in range. */
            if ( reverse )
            {
                if ( ! zslValueGteMin (score, &range) ) break;
            }
            else
            {
                if ( ! zslValueLteMax (score, &range) ) break;
            }

            /* We know the element exists, so ziplistGet should always succeed */
            ziplistGet (eptr, &vstr, &vlen, &vlong);

            rangelen ++;
            /*
            if (vstr == NULL) {
                addReplyLongLong(c,vlong);
            } else {
                addReplyString(c,(char *)vstr,vlen);
            }

            if (withscores) {
                addReplyDouble(c,score);
            }
             */

            if ( vstr == NULL )
            {
                addReplyJsonLongLong (c, type, vlong, score);
            }
            else
            {
                addReplyJsonString (c, type, ( char * ) vstr, vlen, score);
            }
            addReplyJsonSep (c);

            /* Move to next node */
            if ( reverse )
            {
                zzlPrev (zl, &eptr, &sptr);
            }
            else
            {
                zzlNext (zl, &eptr, &sptr);
            }
            if ( outOfRange (c) )
            {
                break;
            }
        }
        addReplyJsonTail (c);
    }
    else if ( zobj->encoding == OBJ_ENCODING_SKIPLIST )
    {
        zset *zs = zobj->ptr;
        zskiplist *zsl = zs->zsl;
        zskiplistNode *ln;

        /* If reversed, get the last node in range as starting point. */
        if ( reverse )
        {
            ln = zslLastInRange (zsl, &range);
        }
        else
        {
            ln = zslFirstInRange (zsl, &range);
        }

        /* No "first" element in the specified interval. */
        if ( ln == NULL )
        {
            return C_ERR;
        }

        /* We don't know in advance how many matching elements there are in the
         * list, so we push this object that will represent the multi-bulk
         * length in the output buffer, and will "fix" it later */

        /* If there is an offset, just traverse the number of elements without
         * checking the score because that is done in the next loop. */
        while ( ln && offset -- )
        {
            if ( reverse )
            {
                ln = ln->backward;
            }
            else
            {
                ln = ln->level[0].forward;
            }
        }
        addReplyJsonHead (c);
        while ( ln && limit -- )
        {
            /* Abort when the node is no longer in range. */
            if ( reverse )
            {
                if ( ! zslValueGteMin (ln->score, &range) ) break;
            }
            else
            {
                if ( ! zslValueLteMax (ln->score, &range) ) break;
            }

            rangelen ++;
            /*
            addReply(c,ln->obj);

            if (withscores) {
                addReplyDouble(c,ln->score);
            }*/
            addReplyJson (c, type, ln->obj, ln->score);
            addReplyJsonSep (c);

            /* Move to next node */
            if ( reverse )
            {
                ln = ln->backward;
            }
            else
            {
                ln = ln->level[0].forward;
            }
            if ( outOfRange (c) )
            {
                break;
            }
        }
        addReplyJsonTail (c);
    }
    else
    {
        return C_ERR;
    }

    if ( withscores )
    {
        rangelen *= 2;
    }
    return C_OK;
}

int zrangebyscoreCommand ( client *c )
{
    return genericZrangebyscoreCommand (c, 0);
}

int zrevrangebyscoreCommand ( client *c )
{
    return genericZrangebyscoreCommand (c, 1);
}

int zcountCommand ( client *c )
{
    robj *key = c->argv[1];
    robj *zobj;
    zrangespec range;
    int count = 0;

    /* Parse the range arguments */
    if ( zslParseRange (c->argv[2], c->argv[3], &range) != C_OK )
    {
        return C_ERR;
    }

    /* Lookup the sorted set */
    if ( ( zobj = lookupKeyRead (c->db, key) ) == NULL ||
         checkType (c, zobj, OBJ_ZSET) ) return C_ERR;

    if ( zobj->encoding == OBJ_ENCODING_ZIPLIST )
    {
        unsigned char *zl = zobj->ptr;
        unsigned char *eptr, *sptr;
        double score;

        /* Use the first element in range as the starting point */
        eptr = zzlFirstInRange (zl, &range);

        /* No "first" element */
        if ( eptr == NULL )
        {
            return C_ERR;
        }

        /* First element is in range */
        sptr = ziplistNext (zl, eptr);
        score = zzlGetScore (sptr);
        zslValueLteMax (score, &range);

        /* Iterate over elements in range */
        while ( eptr )
        {
            score = zzlGetScore (sptr);

            /* Abort when the node is no longer in range. */
            if ( ! zslValueLteMax (score, &range) )
            {
                break;
            }
            else
            {
                count ++;
                zzlNext (zl, &eptr, &sptr);
            }
        }
    }
    else if ( zobj->encoding == OBJ_ENCODING_SKIPLIST )
    {
        zset *zs = zobj->ptr;
        zskiplist *zsl = zs->zsl;
        zskiplistNode *zn;
        unsigned long rank;

        /* Find first element in range */
        zn = zslFirstInRange (zsl, &range);

        /* Use rank of first element, if any, to determine preliminary count */
        if ( zn != NULL )
        {
            rank = zslGetRank (zsl, zn->score, zn->obj);
            count = ( zsl->length - ( rank - 1 ) );

            /* Find last element in range */
            zn = zslLastInRange (zsl, &range);

            /* Use rank of last element, if any, to determine the actual count */
            if ( zn != NULL )
            {
                rank = zslGetRank (zsl, zn->score, zn->obj);
                count -= ( zsl->length - rank );
            }
        }
    }
    else
    {
        return C_ERR;
    }

    addReplyLongLong (c, count);
    return C_OK;
}

int zlexcountCommand ( client *c )
{
    robj *key = c->argv[1];
    robj *zobj;
    zlexrangespec range;
    int count = 0;

    /* Parse the range arguments */
    if ( zslParseLexRange (c->argv[2], c->argv[3], &range) != C_OK )
    {
        return C_ERR;
    }

    /* Lookup the sorted set */
    if ( ( zobj = lookupKeyRead (c->db, key) ) == NULL ||
         checkType (c, zobj, OBJ_ZSET) )
    {
        zslFreeLexRange (&range);
        return C_ERR;
    }

    if ( zobj->encoding == OBJ_ENCODING_ZIPLIST )
    {
        unsigned char *zl = zobj->ptr;
        unsigned char *eptr, *sptr;

        /* Use the first element in range as the starting point */
        eptr = zzlFirstInLexRange (zl, &range);

        /* No "first" element */
        if ( eptr == NULL )
        {
            zslFreeLexRange (&range);
            return C_ERR;
        }

        /* First element is in range */
        sptr = ziplistNext (zl, eptr);
        zzlLexValueLteMax (eptr, &range);

        /* Iterate over elements in range */
        while ( eptr )
        {
            /* Abort when the node is no longer in range. */
            if ( ! zzlLexValueLteMax (eptr, &range) )
            {
                break;
            }
            else
            {
                count ++;
                zzlNext (zl, &eptr, &sptr);
            }
        }
    }
    else if ( zobj->encoding == OBJ_ENCODING_SKIPLIST )
    {
        zset *zs = zobj->ptr;
        zskiplist *zsl = zs->zsl;
        zskiplistNode *zn;
        unsigned long rank;

        /* Find first element in range */
        zn = zslFirstInLexRange (zsl, &range);

        /* Use rank of first element, if any, to determine preliminary count */
        if ( zn != NULL )
        {
            rank = zslGetRank (zsl, zn->score, zn->obj);
            count = ( zsl->length - ( rank - 1 ) );

            /* Find last element in range */
            zn = zslLastInLexRange (zsl, &range);

            /* Use rank of last element, if any, to determine the actual count */
            if ( zn != NULL )
            {
                rank = zslGetRank (zsl, zn->score, zn->obj);
                count -= ( zsl->length - rank );
            }
        }
    }
    else
    {
        return C_ERR;
    }

    zslFreeLexRange (&range);
    addReplyLongLong (c, count);
    return C_OK;
}

/* This command implements ZRANGEBYLEX, ZREVRANGEBYLEX. */
int genericZrangebylexCommand ( client *c, int reverse )
{
    zlexrangespec range;
    robj *key = c->argv[1];
    robj *zobj;
    long offset = 0, limit = - 1;
    unsigned long rangelen = 0;
    int minidx, maxidx;

    /* Parse the range arguments. */
    if ( reverse )
    {
        /* Range is given as [max,min] */
        maxidx = 2;
        minidx = 3;
    }
    else
    {
        /* Range is given as [min,max] */
        minidx = 2;
        maxidx = 3;
    }

    if ( zslParseLexRange (c->argv[minidx], c->argv[maxidx], &range) != C_OK )
    {
        return C_ERR;
    }

    /* Parse optional extra arguments. Note that ZCOUNT will exactly have
     * 4 arguments, so we'll never enter the following code path. */
    if ( c->argc > 4 )
    {
        int remaining = c->argc - 4;
        int pos = 4;

        while ( remaining )
        {
            if ( remaining >= 3 && ! strcasecmp (c->argv[pos]->ptr, "limit") )
            {
                if ( ( getLongFromObject (c->argv[pos + 1], &offset) != C_OK ) ||
                     ( getLongFromObject (c->argv[pos + 2], &limit) != C_OK ) ) return C_ERR;
                pos += 3;
                remaining -= 3;
            }
            else
            {
                zslFreeLexRange (&range);
                return C_ERR;
            }
        }
    }

    /* Ok, lookup the key and get the range */
    if ( ( zobj = lookupKeyRead (c->db, key) ) == NULL ||
         checkType (c, zobj, OBJ_ZSET) )
    {
        zslFreeLexRange (&range);
        return C_ERR;
    }

    if ( zobj->encoding == OBJ_ENCODING_ZIPLIST )
    {
        unsigned char *zl = zobj->ptr;
        unsigned char *eptr, *sptr;
        unsigned char *vstr;
        unsigned int vlen;
        long long vlong;

        /* If reversed, get the last node in range as starting point. */
        if ( reverse )
        {
            eptr = zzlLastInLexRange (zl, &range);
        }
        else
        {
            eptr = zzlFirstInLexRange (zl, &range);
        }

        /* No "first" element in the specified interval. */
        if ( eptr == NULL )
        {
            zslFreeLexRange (&range);
            return C_ERR;
        }

        /* Get score pointer for the first element. */
        sptr = ziplistNext (zl, eptr);

        /* We don't know in advance how many matching elements there are in the
         * list, so we push this object that will represent the multi-bulk
         * length in the output buffer, and will "fix" it later */

        /* If there is an offset, just traverse the number of elements without
         * checking the score because that is done in the next loop. */
        while ( eptr && offset -- )
        {
            if ( reverse )
            {
                zzlPrev (zl, &eptr, &sptr);
            }
            else
            {
                zzlNext (zl, &eptr, &sptr);
            }
        }

        while ( eptr && limit -- )
        {
            /* Abort when the node is no longer in range. */
            if ( reverse )
            {
                if ( ! zzlLexValueGteMin (eptr, &range) ) break;
            }
            else
            {
                if ( ! zzlLexValueLteMax (eptr, &range) ) break;
            }

            /* We know the element exists, so ziplistGet should always
             * succeed. */
            ziplistGet (eptr, &vstr, &vlen, &vlong);

            rangelen ++;
            if ( vstr == NULL )
            {
                addReplyLongLong (c, vlong);
            }
            else
            {
                addReplyString (c, ( char * ) vstr, vlen);
            }

            /* Move to next node */
            if ( reverse )
            {
                zzlPrev (zl, &eptr, &sptr);
            }
            else
            {
                zzlNext (zl, &eptr, &sptr);
            }
        }
    }
    else if ( zobj->encoding == OBJ_ENCODING_SKIPLIST )
    {
        zset *zs = zobj->ptr;
        zskiplist *zsl = zs->zsl;
        zskiplistNode *ln;

        /* If reversed, get the last node in range as starting point. */
        if ( reverse )
        {
            ln = zslLastInLexRange (zsl, &range);
        }
        else
        {
            ln = zslFirstInLexRange (zsl, &range);
        }

        /* No "first" element in the specified interval. */
        if ( ln == NULL )
        {
            zslFreeLexRange (&range);
            return C_ERR;
        }

        /* We don't know in advance how many matching elements there are in the
         * list, so we push this object that will represent the multi-bulk
         * length in the output buffer, and will "fix" it later */

        /* If there is an offset, just traverse the number of elements without
         * checking the score because that is done in the next loop. */
        while ( ln && offset -- )
        {
            if ( reverse )
            {
                ln = ln->backward;
            }
            else
            {
                ln = ln->level[0].forward;
            }
        }

        while ( ln && limit -- )
        {
            /* Abort when the node is no longer in range. */
            if ( reverse )
            {
                if ( ! zslLexValueGteMin (ln->obj, &range) ) break;
            }
            else
            {
                if ( ! zslLexValueLteMax (ln->obj, &range) ) break;
            }

            rangelen ++;
            addReply (c, ln->obj);

            /* Move to next node */
            if ( reverse )
            {
                ln = ln->backward;
            }
            else
            {
                ln = ln->level[0].forward;
            }
        }
    }
    else
    {
        return C_ERR;
    }

    zslFreeLexRange (&range);
    return C_OK;
}

int zrangebylexCommand ( client *c )
{
    return genericZrangebylexCommand (c, 0);
}

int zrevrangebylexCommand ( client *c )
{
    return genericZrangebylexCommand (c, 1);
}

int zcardCommand ( client *c )
{
    robj *key = c->argv[1];
    robj *zobj;

    if ( ( zobj = lookupKeyRead (c->db, key) ) == NULL ||
         checkType (c, zobj, OBJ_ZSET) ) return C_ERR;

    addReplyLongLong (c, zsetLength (zobj));
    return C_OK;
}

int zscoreCommand ( client *c )
{
    robj *key = c->argv[1];
    robj *zobj;
    double score;

    if ( ( zobj = lookupKeyRead (c->db, key) ) == NULL ||
         checkType (c, zobj, OBJ_ZSET) ) return C_ERR;

    if ( zsetScore (zobj, c->argv[2], &score) == C_ERR )
    {
        return C_ERR;
    }
    else
    {
        addReplyDouble (c, score);
    }
    return C_OK;
}

int zrankGenericCommand ( client *c, int reverse )
{
    robj *key = c->argv[1];
    robj *ele = c->argv[2];
    robj *zobj;
    unsigned long llen;
    unsigned long rank;

    if ( ( zobj = lookupKeyRead (c->db, key) ) == NULL ||
         checkType (c, zobj, OBJ_ZSET) ) return C_ERR;
    llen = zsetLength (zobj);


    if ( zobj->encoding == OBJ_ENCODING_ZIPLIST )
    {
        unsigned char *zl = zobj->ptr;
        unsigned char *eptr, *sptr;

        eptr = ziplistIndex (zl, 0);
        sptr = ziplistNext (zl, eptr);

        rank = 1;
        while ( eptr != NULL )
        {
            if ( ziplistCompare (eptr, ele->ptr, sdslen (ele->ptr)) )
                break;
            rank ++;
            zzlNext (zl, &eptr, &sptr);
        }

        if ( eptr != NULL )
        {
            if ( reverse )
                addReplyLongLong (c, llen - rank);
            else
                addReplyLongLong (c, rank - 1);
        }
        else
        {
            addReplyNULL (c);
        }
    }
    else if ( zobj->encoding == OBJ_ENCODING_SKIPLIST )
    {
        zset *zs = zobj->ptr;
        zskiplist *zsl = zs->zsl;
        dictEntry *de;
        double score;

        ele = c->argv[2];
        de = dictFind (zs->dict, ele);
        if ( de != NULL )
        {
            score = * ( double* ) dictGetVal (de);
            rank = zslGetRank (zsl, score, ele);
            if ( reverse )
                addReplyLongLong (c, llen - rank);
            else
                addReplyLongLong (c, rank - 1);
        }
        else
        {
            addReplyNULL (c);
        }
    }
    else
    {
        return C_ERR;
    }
    return C_OK;
}

int zrankCommand ( client *c )
{
    return zrankGenericCommand (c, 0);
}

int zrevrankCommand ( client *c )
{
    return zrankGenericCommand (c, 1);
}

int zscanCommand ( client *c )
{
    robj *o;
    unsigned long cursor;

    if ( parseScanCursor (c, c->argv[2], &cursor) == C_ERR ) return C_ERR;
    if ( ( o = lookupKeyRead (c->db, c->argv[1]) ) == NULL ||
         checkType (c, o, OBJ_ZSET) ) return C_ERR;
    scanGenericCommand (c, o, cursor);
    return C_OK;
}
